package com.marion.micro.app.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

/**
 * 1. 创建Topic: kafka-topics --bootstrap-server :9092 --create --replication-factor 1 --partitions 1 --topic demo
 * 2. 查看Topic: kafka-topics --bootstrap-server :9092  topic --list
 */
@Service
public class KafkaService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public static final String TOPIC = "demo";

    /**
     * 生产消息
     */
    public void produce() {
        kafkaTemplate.send(TOPIC, String.valueOf(System.currentTimeMillis()));
    }

    /**
     * 消费消息
     */
    //@KafkaListener(topics = {TOPIC}, groupId = "app")
    public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
        System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接收到kafka消息,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());
        TimeUnit.SECONDS.sleep(1);
        ack.acknowledge();
    }

}
